Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Add Huawei Cloud OBS connector #4578

Merged
merged 26 commits into from
Jun 15, 2024

Conversation

kim-up
Copy link
Contributor

@kim-up kim-up commented Apr 14, 2023

#4577

Purpose of this pull request

Check list

@TyrantLucifer
Copy link
Member

Good pull request, let's waiting CI

@kim-up kim-up changed the title [Feature][Connector-V2] Add Huawei Cloud OBS connector #4577 [Feature][Connector-V2] Add Huawei Cloud OBS connector Apr 14, 2023
@kim-up
Copy link
Contributor Author

kim-up commented Apr 17, 2023

image

The dependency of esdk-obs-java can not download.
https://mvnrepository.com/artifact/com.huawei.storage/esdk-obs-java/3.19.7.3
image

@TyrantLucifer How should this be handled?

@TyrantLucifer
Copy link
Member

image

The dependency of esdk-obs-java can not download. https://mvnrepository.com/artifact/com.huawei.storage/esdk-obs-java/3.19.7.3 image

@TyrantLucifer How should this be handled?

You can find solution in oss-jindo module's pom file.

@@ -0,0 +1,260 @@
# ObsFile
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion in the community, we have optimized the format requirements of the document, which you can refer to:https://github.com/apache/incubator-seatunnel/issues/4544

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

@hailin0
Copy link
Member

hailin0 commented Apr 17, 2023

Please add e2e testcase and make junit @disabled

@kim-up kim-up requested a review from EricJoy2048 April 18, 2023 12:24
@kim-up
Copy link
Contributor Author

kim-up commented Apr 20, 2023

@TyrantLucifer @EricJoy2048 @hailin0 Is there anything else I need to do ?

@kim-up
Copy link
Contributor Author

kim-up commented Apr 22, 2023

@TyrantLucifer @hailin0 @EricJoy2048 @jbonofre Asking for Help!. Are there any end-to-end test examples for file storage cloud services such as obs, oss, and s3? Because it involves sensitive information such as ak and sk. And cloud services do not necessarily have relevant docker images that can be used for Mock testing.

@TyrantLucifer
Copy link
Member

Hi @kim-up , you can add a e2e test case and run it in your local env, before submit codes you should add annotation to disabled it and remove sensitive information from config file. Finally offer the snapshot image in this pull request. You can refer to selectdb-e2e and file-local-e2e. The idea is to allow novices to quickly find test cases for the connector and run through the code framework you provide by directly changing some key information

@kim-up
Copy link
Contributor Author

kim-up commented Apr 23, 2023

Hi @kim-up , you can add a e2e test case and run it in your local env, before submit codes you should add annotation to disabled it and remove sensitive information from config file. Finally offer the snapshot image in this pull request. You can refer to selectdb-e2e and file-local-e2e. The idea is to allow novices to quickly find test cases for the connector and run through the code framework you provide by directly changing some key information

Get it.

@kim-up
Copy link
Contributor Author

kim-up commented Apr 24, 2023

Docs & e2e have add. Start the workflow? @TyrantLucifer @EricJoy2048 @hailin0

@TyrantLucifer
Copy link
Member

Docs & e2e have add. Start the workflow? @TyrantLucifer @EricJoy2048 @hailin0

Approved, let's waiting CI/CD, BTW could you please offer some snapshots of this connector?

@kim-up
Copy link
Contributor Author

kim-up commented Apr 25, 2023

Docs & e2e have add. Start the workflow? @TyrantLucifer @EricJoy2048 @hailin0

Approved, let's waiting CI/CD, BTW could you please offer some snapshots of this connector?

What is some snapshots ? Docker image ? Is there any related document that can refer to...

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Csv Test

fake_to_obs_csv

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
  }
}

sink {
  ObsFile {
    path="/seatunnel/csv"
    bucket = "obs://********"
    access_key = "********"
    access_secret = "********"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    partition_dir_expression="${k0}=${v0}"
    is_partition_field_write_in_file=true
    file_name_expression="${transactionId}_${now}"
    file_format_type="csv"
    filename_time_format="yyyy.MM.dd"
    is_enable_transaction=true
  }
}
Result

image
image

obs_csv_projection_to_assert

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path="/seatunnel/csv"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    result_table_name = "fake"
    file_format_type = csv
    delimiter = ","
    read_columns = [c_string, c_boolean]
    skip_header_row_number = 1
    schema = {
      fields {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
          c_row = {
            c_map = "map<string, string>"
            c_array = "array<int>"
            c_string = string
            c_boolean = boolean
            c_tinyint = tinyint
            c_smallint = smallint
            c_int = int
            c_bigint = bigint
            c_float = float
            c_double = double
            c_bytes = bytes
            c_date = date
            c_decimal = "decimal(38, 18)"
            c_timestamp = timestamp
          }
      }
    }
  }
}


sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Excel Test

fake_to_obs_excel

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
  }
}

sink {
  ObsFile {
    path="/seatunnel/excel"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    partition_dir_expression="${k0}=${v0}"
    is_partition_field_write_in_file=true
    file_name_expression="${transactionId}_${now}"
    file_format_type="excel"
    filename_time_format="yyyy.MM.dd"
    is_enable_transaction=true
  }
}
Result

image

image

obs_excel_projection_to_assert

env {
 execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path="/seatunnel/excel"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    result_table_name = "fake"
    file_format_type = excel
    delimiter = ;
    read_columns = [c_string, c_boolean]
    skip_header_row_number = 1
    schema = {
      fields {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
          c_row = {
            c_map = "map<string, string>"
            c_array = "array<int>"
            c_string = string
            c_boolean = boolean
            c_tinyint = tinyint
            c_smallint = smallint
            c_int = int
            c_bigint = bigint
            c_float = float
            c_double = double
            c_bytes = bytes
            c_date = date
            c_decimal = "decimal(38, 18)"
            c_timestamp = timestamp
          }
      }
    }
  }
}


sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Json Test

fake_to_obs_file_json

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  ObsFile {
    path = "/seatunnel/json"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "json"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
  }
}
Result

image
image

obs_file_json_to_assert

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path = "/seatunnel/json"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    file_format_type = "json"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_double
          field_type = double
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Orc Test

fake_to_obs_file_orc

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  ObsFile {
    path = "/seatunnel/orc"
    bucket = "obs://*******"
    access_key = "*******"
    access_secret = "*******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "orc"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
    compress_codec = "zlib"
  }
}
Result

image
image

obs_file_orc_projection_to_assert

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path = "/seatunnel/orc"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    file_format_type = "orc"
    read_columns = [c_string, c_boolean, c_double]
    result_table_name = "fake"
  }
}

sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_double
          field_type = double
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Parquet Test

fake_to_obs_file_parquet

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  ObsFile {
    path = "/seatunnel/parquet"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "parquet"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
    compress_codec = "gzip"
  }
}
Result

image

image

obs_file_parquet_projection_to_assert

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path = "/seatunnel/parquet"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    file_format_type = "parquet"
    read_columns = [c_string, c_boolean, c_double]
    result_table_name = "fake"
  }
}

sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_double
          field_type = double
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

Local Test Images

Text Test

fake_to_obs_file_text

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  ObsFile {
    path = "/seatunnel/text"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "text"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
  }
}
Result

image

image

obs_file_text_projection_to_assert

env {
  execution.parallelism = 1
  checkpoint.interval = 5000
  job.mode = "BATCH"
}

source {
  ObsFile {
    path = "/seatunnel/text"
    bucket = "obs://******"
    access_key = "******"
    access_secret = "******"
    endpoint = "obs.cn-east-3.myhuaweicloud.com"
    file_format_type = "text"
    read_columns = [c_string, c_boolean, c_double]
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
    result_table_name = "fake"
  }
}

sink {
  Assert {
    rules {
      row_rules = [
        {
          rule_type = MAX_ROW
          rule_value = 5
        }
      ],
      field_rules = [
        {
          field_name = c_string
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_boolean
          field_type = boolean
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        },
        {
          field_name = c_double
          field_type = double
          field_value = [
            {
              rule_type = NOT_NULL
            }
          ]
        }
      ]
    }
  }
}
Result

image

@kim-up
Copy link
Contributor Author

kim-up commented Apr 26, 2023

@TyrantLucifer Did you mean the test screenshot provided above?

@ic4y
Copy link
Contributor

ic4y commented Aug 6, 2023

@kim-up Please use mvn spotless:apply to format the code.

@kim-up
Copy link
Contributor Author

kim-up commented Aug 8, 2023

@kim-up Please use mvn spotless:apply to format the code.

Done. Fixed by merging dev .

image

@EricJoy2048
Copy link
Member

Hi, @kim-up
You submitted a great PR, but it hasn't been updated in a while, can you update it?

@hailin0 hailin0 merged commit d266f4d into apache:dev Jun 15, 2024
5 of 6 checks passed
chaorongzhi pushed a commit to chaorongzhi/seatunnel that referenced this pull request Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants